{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 管道组件\n",
    "\n",
    "    转换器：通过方法transform(),在原始数据上添加一列或者多列来将一个数据框转为另一个数据框\n",
    "        1.一个特征转换器输入一个数据框，读取一个文本列，将其映射为新的特征向量列，输出一个新的带有特征向量列的数据框\n",
    "        2.一个学习模型转换器输入一个数据框，读取包括特征向量的列，预测每一个特征向量的标签。输出一个新的带有预测标签的数据框\n",
    "    \n",
    "    估计器：，估计器通过fit()方法，接受一个数据框产生一个模型。比如，逻辑回归就是一个估计器，通过fit()来产生一个逻辑回归模型。\n",
    "    \n",
    "管道的工作原理：\n",
    "\n",
    "     管道由一系列有顺序的阶段指定，每个状态时转换器或估计器。每个状态的运行是有顺序的，输入的数据框通过每个阶段进行改变。在转换器阶段，transform()方法被调用于数据框上。对于估计器阶段，fit()方法被调用来产生一个转换器，然后该转换器的transform()方法被调用在数据框上。\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 估计器、转换器和Param示例："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.linalg import Vectors\n",
    "from pyspark.ml.classification import LogisticRegression\n",
    " \n",
    "# Prepare training data from a list of (label, features) tuples.\n",
    "training = spark.createDataFrame([\n",
    "    (1.0, Vectors.dense([0.0, 1.1, 0.1])),\n",
    "    (0.0, Vectors.dense([2.0, 1.0, -1.0])),\n",
    "    (0.0, Vectors.dense([2.0, 1.3, 1.0])),\n",
    "    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], [\"label\", \"features\"])\n",
    " \n",
    "# Create a LogisticRegression instance. This instance is an Estimator.\n",
    "lr = LogisticRegression(maxIter=10, regParam=0.01)\n",
    "# Print out the parameters, documentation, and any default values.\n",
    "print \"LogisticRegression parameters:\\n\" + lr.explainParams() + \"\\n\"\n",
    " \n",
    "# Learn a LogisticRegression model. This uses the parameters stored in lr.\n",
    "model1 = lr.fit(training)\n",
    " \n",
    "# Since model1 is a Model (i.e., a transformer produced by an Estimator),\n",
    "# we can view the parameters it used during fit().\n",
    "# This prints the parameter (name: value) pairs, where names are unique IDs for this\n",
    "# LogisticRegression instance.\n",
    "print \"Model 1 was fit using parameters: \"\n",
    "print model1.extractParamMap()\n",
    " \n",
    "# We may alternatively specify parameters using a Python dictionary as a paramMap\n",
    "paramMap = {lr.maxIter: 20}\n",
    "paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.\n",
    "paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.\n",
    " \n",
    "# You can combine paramMaps, which are python dictionaries.\n",
    "paramMap2 = {lr.probabilityCol: \"myProbability\"}  # Change output column name\n",
    "paramMapCombined = paramMap.copy()\n",
    "paramMapCombined.update(paramMap2)\n",
    " \n",
    "# Now learn a new model using the paramMapCombined parameters.\n",
    "# paramMapCombined overrides all parameters set earlier via lr.set* methods.\n",
    "model2 = lr.fit(training, paramMapCombined)\n",
    "print \"Model 2 was fit using parameters: \"\n",
    "print model2.extractParamMap()\n",
    " \n",
    "# Prepare test data\n",
    "test = spark.createDataFrame([\n",
    "    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),\n",
    "    (0.0, Vectors.dense([3.0, 2.0, -0.1])),\n",
    "    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], [\"label\", \"features\"])\n",
    " \n",
    "# Make predictions on test data using the Transformer.transform() method.\n",
    "# LogisticRegression.transform will only use the 'features' column.\n",
    "# Note that model2.transform() outputs a \"myProbability\" column instead of the usual\n",
    "# 'probability' column since we renamed the lr.probabilityCol parameter previously.\n",
    "prediction = model2.transform(test)\n",
    "selected = prediction.select(\"features\", \"label\", \"myProbability\", \"prediction\")\n",
    "for row in selected.collect():\n",
    "    print row"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 管道实例"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.classification import LogisticRegression\n",
    "from pyspark.ml.feature import HashingTF, Tokenizer\n",
    " \n",
    "# Prepare training documents from a list of (id, text, label) tuples.\n",
    "training = spark.createDataFrame([\n",
    "    (0, \"a b c d e spark\", 1.0),\n",
    "    (1, \"b d\", 0.0),\n",
    "    (2, \"spark f g h\", 1.0),\n",
    "    (3, \"hadoop mapreduce\", 0.0)], [\"id\", \"text\", \"label\"])\n",
    " \n",
    "# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.\n",
    "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"words\")\n",
    "hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol=\"features\")\n",
    "lr = LogisticRegression(maxIter=10, regParam=0.01)\n",
    "pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])\n",
    " \n",
    "# Fit the pipeline to training documents.\n",
    "model = pipeline.fit(training)\n",
    " \n",
    "# Prepare test documents, which are unlabeled (id, text) tuples.\n",
    "test = spark.createDataFrame([\n",
    "    (4, \"spark i j k\"),\n",
    "    (5, \"l m n\"),\n",
    "    (6, \"mapreduce spark\"),\n",
    "    (7, \"apache hadoop\")], [\"id\", \"text\"])\n",
    " \n",
    "# Make predictions on test documents and print columns of interest.\n",
    "prediction = model.transform(test)\n",
    "selected = prediction.select(\"id\", \"text\", \"prediction\")\n",
    "for row in selected.collect():\n",
    "    print(row)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
